查看原文
其他

集成才是硬道理! 用它构建一个完整的Hadoop

ApacheHudi 2022-04-23

The following article is from 小猴学Java Author 小猴




# 本章主题

Hudi典型应用场景

Hudi对比Kudu、Hive、Hbase

Hudi对Streaming的支持

Hudi核心概念

Hudi核心API

1

应用场景

1


高效数据采集

我们构建Data Lake需要从各种各样的数据源将数据抽取到Hadoop中,例如:一些网站流量日志、关系型数据库、IOT等等。而目前,一个大数据系统中,会采用各种各样的不同采集工具来进行数据采集。比较常见的:使用Flume采集日志数据、使用Sqoop/DataX将RDBMS的数据全量或者增量导入、通过Canel将MySQL中的数据实时摄取、通过Oracle Golden Gate将Oracle的数据实时摄取。但抽取的数据源很多,不得不维护多套不同架构的采集系统。

而Hudi提供的Upserts操作可以更易维护地采集RDBMS、以及向HBase/Cassandra这样的NoSQL或者是Kafka这样的消息队列。


2

ALL IN ONE近实时数据分析

在建立企业级的数据系统时,如果要构建实时的数据集市,可以采用Apache Druid、MemSQL或者是OpenTSDB来提供支持,这种方式对于较小的数据量的查询(例如:系统系统、或者交互式实时分析)是没有问题的。而如果Hadoop上的数据过时了,但供实时数据集市的这部分的过期数据仍然保留着,这些数据很少再被交互式查询。这样会导致资源利用率不足,造成硬件资源的浪费。

在Hadoop上基于Presto和Spark SQL的交互式SQL查询,可以在几秒内完成查询。Apache Hudi,可以在几分钟内完成数据的更新,它可以对多个存储在DFS上的大表进行分析计算。Apache Hudi没有外部依赖(例如:要使用Kylin就得有一套专门用于sub-second的HBase集群),所以使用Apache Hudi可以在不增加额外开销的情况下,对新数据进行更快的查询分析。


3

增量式数据处理管道

传统的数据流处理,大都都是基于DAG的工作流,基于Hadoop上形成一条数据管道。在传统的数据仓库中,一般使用一个新的HDFS文件夹、或者是一个Hive分区来组织数据。例如:通过sqoop,可以以每个小时为单位生成一个Hive分区。并在每个小时结束的时候,将数据同步到HDFS。而下游的任务,等到数据提取完后,继续调度执行。如果下游任务的计算需要1个小时,这一共就是2个小时以上的延迟。

而在一些移动互联网和物联网应用中,数据是不断上传的。而且在传输数据过程中,也会出现数据延迟到达的情况,为了保证数据的正确性,每个小时都需要重跑最后几个小时的数据。为了解决数据的延迟去重跑数据校验结果,会严重影响系统的性能。大家可以想象以下,如果有很多的工作流,每个小时都需要重跑TB级的数据,其影响有多大不言而喻。这也是大部分批处理系统的问题所在:数据的粒度是以分区为单位的。


Hudi提供了以record为粒度(不再是以分区或者文件夹为粒度),来处理上游Hudi表中的新数据。并且可以Update延迟的数据,并且可以支持更高频率的连续调度,例如:15分钟、5分钟。

4


从容应对数据冗余

HDFS有一个经典场景就是把数据文件进行物理切分为block,再将block分布到集群中。接着就可以供应用程序使用了。举一个关于滴滴出行的例子,通过建立一个基于Spark的pipeline,然后从Hadoop存储的日志中找到紧急刹车的事件,然后把这些紧急刹车的事件装载到ElasticSearch中,其他应用通过读取、分析、处理ElasticSearch上的数据,供安全驾驶策略、系统使用。而每天都有大量用户在使用滴滴出行,为了避免写入大量的事件压垮ElasticSearch(服务存储),典型的做法是先写入到消息队列,比较流行地做法就是选择Kafka,然后再消费Kafka中的数据,写入到ElasticSearch。这种做法在业界目前使用很普遍,但是有个问题,Kafka和DFS会冗余大量数据。

而Hudi将每个管道的Spark Pipeline Upsert输出放入到Hudi表中,然后对表进行增量跟踪(用起来就像Kafka主题一样)来获取新的数据,再并入到ElasticSearch(服务存储)中。

2

对比其他经典组件

Apache Hudi填补了在分布式文件系统处理数据的空白,它可以很好地和DFS共存。但在做技术选型时,需要对各种方案有所了解,这样才能权衡利弊,找到适合当前业务场景的技术方案。


1

Apache Kudu


Kudu是一个存储系统,它的设计目标是旨在能够支持Upsert,还能够支持对PB级的数据规模进行实时分析。Kudu好有理想哦,还尝试能够充当OLTP  workload的存储。而Hudi并没有尝试这样。

Kudu与HDFS完全不同,Hudi旨在和DFS一起使用(例如:HDFS、S3等),Hudi并没有自己的存储服务器,它借助于DFS存储数据,并借助于Spark来完成计算。所以,Hudi是一套兼容、集成的方案,能够更好的融入到Hadoop生态和Spark生态。而Kudu就不一样了,我们需要自己构建一套Kudu的集群,很多时候要买新的硬件资源。


2

Hive事务

Hive 0.13提供了完整的ACID特性,并支持事务。Hive尝试在基于ORC的文件格式基础上,来实现merge-on-read(后面会讲到)的存储。Hive事务是必须基于Hive的Metastore之上启动的Hive任务或者查询实现的,而Hudi可以充分利用Spark处理框架的所有功能。Hudi作为库嵌入到Spark中,实现和维护起来会容易很多。Hudi的这种设计,是可以在可以完全脱离Hive的计算引擎(例如:Presdo、Spark)使用,将来还会提供除了parquet之外更多的文件格式支持。

3


HBase

尽管HBase是一个为OLTP workload设计的key-value存储系统,因为HBase和Hadoop挨着,所以很多人把HBase和分析关联在一起,HBase对写操作进行了大量优化,可以支持亚秒级更新。而Hive-on-HBase以及Phoenix,让可以用户用SQL地方式查询数据。


但根据在分析上的workload性能表现,混合式的列式存储(例如:Parquet/ORC)要比HBase的StoreFile要好得多,因为分析主要的性能压力都是来自于大量数据的读取,而不是写入。


而Hudi的出现,弥补了这一块缺憾。有了Hudi,可以减少数据与分析性存储格式的差距。HBase并不像Hudi一样,提供了支持提交时间、增量拉取之类的增量处理primitive(原语)。

3

Hudi对流处理的支持

Hudi是可以与目前的批处理、和流处理集成,并将计算结果存储在Hadoop中。对于Spark来说就比较简单些,直接把Hudi的库和Spark/Spark Streaming整合。对于Flink,可以先在Flink进行数据处理,然后将处理后的数据推入到Kafka中或者HDFS中间文件,让导入到Hudi的表中。所以Hudi相当于是一种Source或者Sink。相信不久的将来,Flink-hudi-connector就会到来。

大家可以看一下这个:

https://hudi.apache.org/blog/apache-hudi-meets-apache-flink/

4

Hudi核心概念


Hudi基于HDFS提供了以下两类primitive:

1、Update/Delete

2、Change Streams——数据被Update的事件流


1

Timeline

Hudi会自动维护不同时间对表的所有操作。

这样,我们可以即时查询任意时间的数据。


Hudi instant包含了一下几个组件:

1、Instant action:在表上执行的操作类型

2、Instant time:操作时间(例如:20201220013122),时间戳是单调递增的

3、state:instant的当前状态


Hudi执行的关键操作包含有6个部分:

1、COMMIT:将一批原子写入原子操作提交到表中

2、CLEANS:通过后台启动删除表中不再需要用到的旧文件

3、DELTA_COMMIT:增量提交,将一批记录原子写入到Merge-On-Read类型的表中。

4、COMPACTION:通过后台启动进行数据压缩。例如:将基于行存储的数据转换为列存储格式。在HUDI内部,COMPACTION是timeline上的特殊的COMMIT

5、ROLLBACK:当提交时出现问题进行回滚,删除在写期间产生的文件

6、SAVEPOINT:将一些文件标记为saved,这样后台删除程序就不再会清理他们。在出现故障或者数据恢复时,可以将数据还原到timeline的某个时间点。


Hudi在任意的时间是以下三种状态之一:

1、REQUESTED:已经调度该操作,但还未启动

2、INFLIGHT:表示正在执行操作

3、COMPLETE:表示在timeline上完成了操作

上面这幅图是根据Hudi官网重制的。我们可以看到:

1、看一下Timeline,包含了从10:00到10:20发生的所有更新事件,大约5分钟一次

2、Hudi会在时间轴上保留提交、COMPACTION以及CLEAN的元数据

3、Hudi数据的组织反映了数据的事件时间


Hudi每一次提交都会在Timeline有一项记录,并将数据以不同的时间段划分,但有延迟的数据进来,可以继续更新。所以在数据消费端,总是能够看到最新的数据。延迟的数据通过Upsert将更新的数据生成到之前的时间段的目录中。通过Timeline,可以获取从10:00开始成功提交的所有数据的增量查询。


2

文件管理

Hudi中的表其实就是HDFS指定路径下的目录结构。与Hive非常类似,一个Hudi表可以有多个分区,分区也是以文件夹组织的。每个分区都有相对于基本路径的唯一标识。

在分区中,文件以group方式组织,每个group都有自己的ID。每个group可以包含几个文件切片,每个切片被提交后生成parquet文件。

3


索引

Hudi通过索引机制,提供高效地Upsert操作。索引是基于给定的hoodie key(record的key + 分区路径)映射到文件id。一旦record的第一个版本写入到文件中,这个用于索引的hoodie key就不会再变了。索引其实就是包含一组record的所有版本的映射。


4

表类型与查询类型

Hudi的表类型定义了数据如何在HDFS中进行索引、存储目录以及primitive、和timeline的各类操作(如何写入数据)。而查询类型定义了读取数据。以下是Hudi支持的表类型与查询类型。


1、Copy on write表类型:支持快照查询、增量查询

2、Merge on read表类型:支持快照查询、增量查询、读取优化查询。

核心概念:Copy on write和Merge on read

Copy on write类型的表,仅使用列式文件格式存储数据(当前是parquet文件格式)。在写入数据文件过程中,同时进行数据合并,更新文件版本并重写文件。

Merge on read类型的表,会使用列式(parquet)和行式(avro)文件格式来存储数据。数据的更新会记录到增量文件中,然后以同步或者异步地方式生成新版本的文件。这种方式可以让提交的频率更低。而关键在于Compaction策略。(有点类似HBase),这种方式可以有效避免写放大。

对比这两种不同类型的表:


5

核心概念:查询类型


Snapshot Queries(快照查询)

这种查询方式可以某个commit或者compaction的最新快照。如果是merge-on-read类型的表,会动态地将最新的文件切片和增量文件合并来提供近实时的数据(几分钟)。如果是copy-on-write类型表,可以直接从已经存在的parquet类型的表中查询数据,并同时提供查询从其他side write写入的功能。


Incremental Queries(增量查询)

Hudi提供非常方便的基于时间戳的增量查询。在Hudi中,只有一个提交和Compact已经完成,查询才能看到写入表的新数据。增量查询可以有效地提供change stream。


Read optimized Queries(读优化查询)

这种查询可以查看到提交/压缩后的最新快照。这种方式仅仅读取已经完成的切片数据文件(parquet文件),并保证与列式查询同样性能。


同样,也来对比一下这两类查询:

增量查询到的数据是低延迟的,因为它会将增量日志的数据合并在一起返回。而Read Optimized查询的延迟性较高,因为它不进行合并读。而增量查询查询延迟较高,是因为每次都要合并查询,而Read Optimized查询无需合并,直接将parquet文件查询返回。


5

Hudi核心API(on spark)

了解了Hudi的核心概念之后。我们就可以开心地使用Spark啦。本次,我们来实现Hudi的6个最基础的操作:

1、插入数据

2、查询数据

3、更新数据

4、增量查询数据

5、指定时间查询

6、删除数据


1

项目准备工作

导入Spark依赖和Hudi依赖。本次使用的spark版本为2.4.7、scala版本为2.12、hadoop为2.7.5、hudi版本为0.6。

<repositories> <repository> <id>nexus-aliyun</id> <name>nexus-aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository></repositories>
<properties> <spark-version>2.4.7</spark-version> <scala-version>2.12</scala-version> <hadoop-version>2.7.5</hadoop-version> <hudi-version>0.6.0</hudi-version></properties>
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala-version}</artifactId> <version>${spark-version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala-version}</artifactId> <version>${spark-version}</version> </dependency>
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.1</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-version}</version> </dependency>
<dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark-bundle_${scala-version}</artifactId> <version>${hudi-version}</version> </dependency>
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.11</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-avro_${scala-version}</artifactId> <version>${spark-version}</version> </dependency></dependencies>

  

2

准备订单生成器

为了方便测试,我们来编写一个订单的数据生成器。其中,订单包含以下几个字段:

1、订单ID(使用UUID生成)

2、订单价格

3、订单提交日期

4、订单状态

5、订单商品数量


// 订单实体类case class Order(id: String, price: BigDecimal, commit_date: Long, status: Int, count: Int, dt:String)
// 生成指定数量的订单def orderGen(num: Int) = { val r = new Random() val sdf = new SimpleDateFormat("yyyy-MM-dd")
(0 to num).map(n => { val now = new Date() Order(UUID.randomUUID().toString, BigDecimal.decimal(r.nextFloat() * 1000), now.getTime, r.nextInt(6), r.nextInt(10) + 1, sdf.format(now)) }).toList}


3

Insert Data

将DataFrame数据导入到Hudi需要留意这及格配置:

1、INSERT/DELETE的并行度配置:

hoodie.insert.shuffle.parallelism

hoodie.upsert.shuffle.parallelism

这里使用的是QuickStart的配置,两个并行度都是2


2、表名,此处指的是元数据中的表名。注意,Hudi并不会为表自动生成目录,需要我们在path中指定表存放的位置。

HoodieWriteConfig.TABLE_NAME


3、表类型,默认为COPY_ON_WRITE

DataSourceWriteOptions.TABLE_TYPE_OPT_KEY


4、记录的key

DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY


5、如果有记录出现一样的key,会选择提交预聚合配置的字段值最大的那个

DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY


6、配置用于分区的字段(可以指定多个)

DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY


7、配置是否采用Hive的分区组织方式

DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY


注意:调用save的时候,需要指定把表的存放位置放进去。


更多的配置参考:

https://hudi.apache.org/docs/configurations.html#write-options


// 1. 构建Spark运行环境val spark = SparkSession .builder() .master("local[*]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", 4) .appName("Load order data") .getOrCreate()
// 2. 生成10000个订单,并转换为DataFrameval orderDF = spark.createDataFrame(orderGen(10000))
// 3. 写入到Hudi// 可以在https://hudi.apache.org/docs/configurations.html中找到详细配置orderDF.write.format("hudi") // 配置Hudi insert和update的并行度为2,可以通过hudi客户端传入 .options(getQuickstartWriteConfigs) // 配置表名【必须】 .option(HoodieWriteConfig.TABLE_NAME, "tbl_order") // 配置表类型【默认为COPY_ON_WRITE】 .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") // 配置record的key【默认uuid】 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") // 配置预聚合的字段【默认ts】 // 如果两个record的key相同时,会取该字段指定的最大的那个 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "commit_date") // 指定分区字段【默认partitionpaht】 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt") // 指定是否采用类似Hive的分区组织方式【默认:false】 .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true) .mode(SaveMode.Append) .save("/hudi/tbl_order")


4

Query Data

Hudi查询数据非常方便,Spark直接read就可以了。只需要指定从什么位置读取数据即可。因为之前创建的表只有一个层级的分区,所以读取的时候使用的是/hudi/*,如果是多级分区可以使用/hudi/*/*…


// 1. 构建Spark运行环境val spark = SparkSession .builder() .master("local[*]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", 4) .appName("Load order data") .getOrCreate()
spark.read.format("hudi") .load("/hudi/tbl_order/*") .createOrReplaceTempView("tbl_order");
// 查看tbl_order的数据spark.sql("select * from tbl_order").show()spark.sql("select status, count(id) from tbl_order where dt = '2021-01-02' group by status").show()


执行结果如下:

查询2021-01-02的数据并按照状态进行分组聚合 +------+---------+ | 1| 168464| | 3| 168263| | 5| 168218| | 4| 168764| | 2| 167997| | 0| 168296| +------+---------+


5

Update Data

 

更新数据和插入数据非常像。因为默认OPERATION_OPT_KEY配置的是:UPSERT,是支持插入和更新的。我们只需要将需要更新的数据直接写入到Hudi即可,但必须要将record key带上。


// 1. 构建Spark运行环境val spark = SparkSession .builder() .master("local[*]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", 4) .appName("Load order data") .getOrCreate()
// 2. 获取所有状态为0的数据,并将状态更新为1spark.read.format("hudi") .load("/hudi/tbl_order/*") .createOrReplaceTempView("tbl_order")
val updateDF = spark.sql("select id, price, commit_date, 1 as status, count, dt from tbl_order where status = 0")
// 3. 写入到Hudi// 可以在https://hudi.apache.org/docs/configurations.html中找到详细配置updateDF.write.format("hudi") // 配置Hudi insert和update的并行度为2,可以通过hudi客户端传入 .options(getQuickstartWriteConfigs) // 配置表名【必须】 .option(HoodieWriteConfig.TABLE_NAME, "tbl_order") // 配置表类型【默认为COPY_ON_WRITE】 .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") // 配置record的key【默认uuid】 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") // 配置预聚合的字段【默认ts】 // 如果两个record的key相同时,会取该字段指定的最大的那个 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "commit_date") // 指定分区字段【默认partitionpaht】 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt") // 指定是否采用类似Hive的分区组织方式【默认:false】 .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true) .mode(SaveMode.Append) .save("/hudi/tbl_order")


5

Incremental Query Data

增量查询需要指定一个读取参数:BEGIN_INSTANTTIME_OPT_KEY,通过该配置我们可以查询出来某个时间点数据写入Hudi的数据。


// 1. 构建Spark运行环境val spark = SparkSession .builder() .master("local[*]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", 4) .appName("Load order data") .getOrCreate()
spark.read.format("hudi") // 指定进行增量查询 .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) // 指定查询从2021年1月2号18点之后提交的数据 .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20210102180000") .load("/hudi/tbl_order/*") .createOrReplaceTempView("tbl_order_incremental");
spark.sql("select * from tbl_order_incremental").show()spark.sql("select count(*) from tbl_order_incremental").show()


6

Point in time Query Data

指定时间范围查询和incremental query类似,只需要指定一个END_INSTANTTIME_OPT_KEY即可。

spark.read.format("hudi") // 指定进行增量查询 .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) // 指定查询从2021年1月2号18点之后提交的数据 .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20210102183100") .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, "20210102183126") .load("/hudi/tbl_order/*") .createOrReplaceTempView("tbl_order_incremental");
spark.sql("select * from tbl_order_incremental").show()spark.sql("select count(*) from tbl_order_incremental").show()


7

Delete Data

Hudi支持两种方式来删除数据,一种是软删除、另一种是硬删除(物理删除)。

软删除的方式就是保留record的key,将其他字段设置为null。

硬删除就是物理删除,将record的数据从表中删除。有三种方式:

  • 设置OPERATION_OPT_KEY 为DELETE_OPERATION_OPT_VAL

  • 设置PAYLOAD_CLASS_OPT_KEY

    org.apache.hudi.EmptyHoodieRecordPayload。此操作将会在DataSet提交时移除所有的数据。

  • 在DataSource中添加名为_hoodie_is_deleted列到DataSet中,将要删除的record这一列设置为true。对于要重新upsert的数据,把这列设置为false或者null即可。


删除数据只需要将Hoodie record的key传入即可。操作方式与INSERT/UPDATE类似。只是需要将OPERATION_OPT_KEY参数设置为delete即可。

// 1. 构建Spark运行环境val spark = SparkSession .builder() .master("local[*]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.default.parallelism", 4) .appName("Load order data") .getOrCreate()
val hudiDF = spark.read.format("hudi") .load("/hudi/tbl_order/*")
hudiDF.createOrReplaceTempView("tbl_order")
spark.sql("select count(id) from tbl_order").show(100)
val deleteAllDF = spark.sql("select id from tbl_order")
// 3. 从Hudi中删除所有数据// 可以在https://hudi.apache.org/docs/configurations.html中找到详细配置deleteAllDF.write.format("hudi") // 配置Hudi insert和update的并行度为2,可以通过hudi客户端传入 .options(getQuickstartWriteConfigs) // 配置删除【默认为upsert】 // DELETE_OPERATION_OPT_VAL为软删除 // DELETE_OPERATION_OPT_VAL为软删除 // .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) // 配置表名【必须】 .option(HoodieWriteConfig.TABLE_NAME, "tbl_order") // 配置record的key【默认uuid】 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id") // 配置预聚合的字段【默认ts】 // 如果两个record的key相同时,会取该字段指定的最大的那个 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "commit_date") // 指定分区字段【默认partitionpaht】 .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt") // 指定是否采用类似Hive的分区组织方式【默认:false】 .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, true) .mode(SaveMode.Append) .save("/hudi/tbl_orde")
spark.sql("select * from tbl_order").show()


# 总结

Hudi通过集成Hadoop、Spark,而且操作方式非常简洁,未来可期,一定会受到越来越多人的关注。能够基于现有的一套解决方案,为什么要引入更多的复杂、资源、架构呢?

未来可期,Hudi!

以上


参考文献:

https://hudi.apache.org/docs/use_cases.html

https://hudi.apache.org/docs/concepts.html#copy-on-write-table

https://hudi.apache.org/docs/quick-start-guide.html

推荐阅读


重磅!华为云湖仓一体FusionInsight集成Apache Hudi

数据湖风暴来袭!阿里云EMR重磅发布Apache Hudi

Lakehouse: 统一数据仓库和高级分析的新一代开放平台

Hi, Data Lakers!这里有一份来自PMC Chair的新年礼包,请注意查收!

数据湖框架选型很纠结?一文了解Apache Hudi核心优势

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存